//purpose: bitmap management for lsv, lower level IO implemention to bitmap.

//file created by zsy on 2017.03.5

#include <unistd.h>
#include <stdio.h>
#include <pthread.h>

#include "lsv_conf.h"
#include "list.h"
#include "lsv_volume.h"
#include "lsv_bitmap.h"
#include "lsv_bitmap_internal.h"
#include "lsv_types.h"
#include "lsv_help.h"
#include "schedule.h"

int lsv_bitmap_is_row2(void *volume_context)
{
        return ((lsv_volume_proto_t *)volume_context)->volume_format == VOLUME_FORMAT_ROW2 ? 1:0;
}

/**
 * @note 数据chunk不要清零，元数据分配需要清零，COW的不需要清零
 *
 * @param volume_context
 * @param chunk_id
 * @param zero
 * @return
 */
int lsv_bitmap_alloc_chunk(void *volume_context, uint32_t *chunk_id, int zero)
{
        int ret = lsv_volume_chunk_malloc((lsv_volume_proto_t *)volume_context,
                        LSV_BITMAP_STORAGE_TYPE, //lsv_u8_t type,
                        chunk_id);
        if(ret)
        {
                //fatal error.
                return ret;
        }

        if(zero)
        {
                uint8_t *pbuf = xmalloc(LSV_CHUNK_SIZE);
                memset(pbuf, 0, LSV_CHUNK_SIZE);
                ret = lsv_bitmap_write_chunk(volume_context, *chunk_id, 0, LSV_CHUNK_SIZE, pbuf);
                if(ret)
                        lsv_volume_chunk_free((lsv_volume_proto_t *)volume_context, LSV_BITMAP_STORAGE_TYPE, *chunk_id);

                xfree(pbuf);
        }

        return ret;
}

int lsv_bitmap_read_chunk(void *volume_context, uint32_t vvol_id, uint32_t chunk_id, uint32_t chunk_off,
                          uint32_t len, void *buf)
{
        //assert(chunk_id < 100 * 1024768);

        int ret;
        uint64_t vol_id = 0;

        if(vvol_id)
        {
                ret = lsv_bitmap_vvol_to_vol(volume_context, vvol_id, &vol_id);
                if(ret)
                        return ret;
        }

        LSV_DBUG("lsv bitmap read chunk from vol: %lld, chunk_id:%u,chunk_off:%u %u\n", (LLU)vol_id,
              chunk_id, chunk_off, len);

        ret = lsv_volume_chunk_read_data((lsv_volume_proto_t *)volume_context, vol_id,
                        chunk_id, chunk_off, len, buf);

        //int crc = testcrc(buf,len);
        //printf("read chunk, id=%d, len=%d, crc=%x\r\n", chunk_id, len, crc);

        return ret;
}

int lsv_bitmap_write_chunk(void *volume_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *buf)
{
        //int crc = testcrc(buf,len);
        //printf("write chunk, id=%d, len=%d, crc=%x\r\n", chunk_id, len, crc);

        /*{
                uint8_t *pb = (uint8_t *)buf;
                int i;

                for(i=0;i<len;i++)
                        if(pb[i] != 0xff)
                                break;
                assert(i < len /2 );
        }*/

        LSV_DBUG("lsv bitmap write chunk: chunk_id:%u,chunk_off:%u,length:%u\n", chunk_id, chunk_off, len);

        lsv_volume_io_t vio;
        lsv_volume_io_init(&vio, chunk_id, chunk_off, len, LSV_BITMAP_STORAGE_TYPE);

        return lsv_volume_chunk_update((lsv_volume_proto_t *)volume_context, &vio, buf);
}

/**
 * 混合存储时，区分元数据和数据的存储分层：元数据写入SSD，数据写入HDD
 *
 * @param volume_context
 * @param chunk_id
 * @param chunk_off
 * @param len
 * @param buf
 * @return
 */
int lsv_bitmap_write_data(void *volume_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *buf)
{
        LSV_DBUG("chunk_id:%u,chunk_off:%u\n", chunk_id, chunk_off);

        lsv_volume_io_t vio;
        lsv_volume_io_init(&vio, chunk_id, chunk_off, len, LSV_LOG_LOG_STORAGE_TYPE);

        return lsv_volume_chunk_update((lsv_volume_proto_t *)volume_context, &vio, buf);
}

void lsv_bitmap_read_cb_callback(lsv_s32_t rw_rc, void*arg)
{
        (void) rw_rc;
        aio_list_entry_t *pentry = (aio_list_entry_t *)arg;
        lsv_lock_t *plock = &pentry->lock;

        lsv_unlock(plock);

        lsv_bitmap_cache_mark_valid(pentry->cache_unit, pentry->vio.offset / PAGE_SIZE, pentry->vio.size / PAGE_SIZE);

#if LSV_BITMAP_CHECK_BITMAP_LBA

        lsv_bitmap_unit_t *pbitmap = (lsv_bitmap_unit_t *)(pentry->cache_unit->bitmap_cache + pentry->vio.offset);
        for(int i=0;i<pentry->vio.size/PAGE_SIZE;i++)
        {
                assert(pbitmap->lba != (uint64_t)0xffffffffffffffff);

                pbitmap++;
        }

#endif

        struct timeval t2;
        _gettimeofday(&t2, NULL);
        int64_t used = _time_used(&pentry->tim, &t2);

        if(used > 20000)
                DERROR("bitmap load page time used %lld\r\n", (LLU)used);

        lsv_bitmap_cache_clear_load(pentry->cache_unit, pentry->vio.offset / PAGE_SIZE, pentry->vio.size / PAGE_SIZE);
#if LSV_BITMAP_USE_PAGE_LOCK
        lsv_bitmap_cache_page_unlock(pentry->cache_unit, pentry->vio.offset / PAGE_SIZE, pentry->vio.size / PAGE_SIZE);
#endif

        xfree(pentry);
};

int lsv_bitmap_read_chunk_async(void *volume_context, void *buf, lsv_volume_io_t *io, void *param)
{
        //int crc = testcrc(buf,len);
        //printf("write chunk, id=%d, len=%d, crc=%x\r\n", chunk_id, len, crc);

        // LSV_DBUG("lsv bitmap write chunk: chunk_id:%u,chunk_off:%u\n", chunk_id, chunk_off);

        return lsv_volume_chunk_read_callback((lsv_volume_proto_t *)volume_context, io, buf, lsv_bitmap_read_cb_callback, param);
}

void lsv_bitmap_write_cb_callback(lsv_s32_t rw_rc, void*arg)
{
        (void) rw_rc;
        aio_list_entry_t *pentry = (aio_list_entry_t *)arg;
        lsv_lock_t *plock = &pentry->lock;

        lsv_unlock(plock);

        lsv_bitmap_cache_clear_dirty(pentry->cache_unit, pentry->vio.offset / PAGE_SIZE, pentry->vio.size / PAGE_SIZE);

        struct timeval t2;
        _gettimeofday(&t2, NULL);
        int64_t used = _time_used(&pentry->tim, &t2);

        if(used > 20000)
                DERROR("bitmap commit time used %lld\r\n", (LLU)used);

#if LSV_BITMAP_USE_PAGE_LOCK
        lsv_bitmap_cache_page_unlock(pentry->cache_unit, pentry->vio.offset / PAGE_SIZE, pentry->vio.size / PAGE_SIZE);
#endif
};

int lsv_bitmap_write_chunk_async(void *volume_context, void *buf, lsv_volume_io_t *io, void *param)
{
        //int crc = testcrc(buf,len);
        //printf("write chunk, id=%d, len=%d, crc=%x\r\n", chunk_id, len, crc);

        // LSV_DBUG("lsv bitmap write chunk: chunk_id:%u,chunk_off:%u\n", chunk_id, chunk_off);

        return lsv_volume_chunk_update_callback((lsv_volume_proto_t *)volume_context, io, buf, lsv_bitmap_write_cb_callback, param);
}

int lsv_bitmap_replicate_and_write_chunk(struct lsv_bitmap_context *bitmap_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *buf)
{
        (void) bitmap_context;
        (void) chunk_id;
        (void) chunk_off;
        (void) len;
        (void) buf;
        return 0;
}

int lsv_bitmap_free_chunk(void *volume_context, uint32_t chunk_id)
{
        return lsv_volume_chunk_free((lsv_volume_proto_t *)volume_context, LSV_BITMAP_STORAGE_TYPE,chunk_id);
}

void lsv_bitmap_init_lock(lsv_bitmap_context_t *node)
{
        /*pthread_mutexattr_t attr;
          pthread_mutexattr_init(&attr);
          pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);

          pthread_mutex_init(&node->rwlock, &attr);*/

        lsv_rwlock_init(&node->rwlock);
        lsv_rwlock_init(&node->cow_lock);
}

void lsv_bitmap_free_lock(lsv_bitmap_context_t *node)
{
        lsv_rwlock_destroy(&node->rwlock);
}

void lsv_bitmap_wlock(void *volume_context)
{
        lsv_bitmap_context_t *root =((lsv_volume_proto_t *)volume_context)->bitmap_context;

        lsv_wrlock(&root->rwlock);
}

void lsv_bitmap_rlock(void *volume_context)
{
#ifdef CACHE_NEW
        return ;
#else
        lsv_bitmap_context_t *root =((lsv_volume_proto_t *)volume_context)->bitmap_context;

        #if LSV_BITMAP_USE_GLOBAL_LOCK
                lsv_wrlock(&root->rwlock);
        #else
        // TODO core: resize if io
                lsv_rdlock(&root->rwlock);
        #endif
#endif
}

void lsv_bitmap_unlock(void *volume_context)
{
#ifdef CACHE_NEW
        return;
#else
        lsv_bitmap_context_t *root =((lsv_volume_proto_t *)volume_context)->bitmap_context;

        lsv_rwunlock(&root->rwlock);
#endif
}

void lsv_bitmap_lock_node(lsv_bitmap_context_t *node)
{
        (void)node;
}

void lsv_bitmap_unlock_node(lsv_bitmap_context_t *node)
{
        (void)node;
}

void bitmap_header_load_callback(void *arg)
{
        struct lsv_bitmap_context *node = (struct lsv_bitmap_context *)arg;
        lsv_volume_proto_t *volume = (lsv_volume_proto_t *)node->volume_context;
        struct lsv_bitmap_context *bitmap_root = (struct lsv_bitmap_context *)volume->bitmap_context;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(node);

        int ret = bitmap_header_load_internal(node, node->bitmap_header->reserved2[0]);
        if(ret)
        {
                bitmap_root->is_loading = 2;
                DERROR("Fatal in loading bitmap header\r\n");
        }

        if(pheader->first_child_chunk_id)
        {
                struct lsv_bitmap_context *bitmap_node;

                ret = lsv_bitmap_create_node(bitmap_root->volume_context, &bitmap_node, LSV_BITMAP_FALSE, 0);
                if(ret)
                        return;

                        //current node is parent.
                bitmap_node->parent = node;
                node->first_child = bitmap_node;

                bitmap_header_load_async(bitmap_node, pheader->first_child_chunk_id);
        }

        if(pheader->next_chunk_id)
        {
                struct lsv_bitmap_context *bitmap_node;

                LSV_DBUG("Loading brother node, thunkid=%d.", pheader->next_chunk_id);
                ret = lsv_bitmap_create_node(bitmap_root->volume_context, &bitmap_node, LSV_BITMAP_FALSE, 0);
                if(ret)
                        return;

                //brother, same parent.
                bitmap_node->parent = node->parent;
                node->next = bitmap_node;

                bitmap_header_load_async(bitmap_node, pheader->next_chunk_id);
                LSV_DBUG("Loading brother node ok.");
        }

        if(bitmap_root->bitmap_header->active_chunk_id == pheader->header_chunk_ids[0])
                bitmap_root->active = node;

        bitmap_root->bitmap_header->reserved2[2] --;

        LSV_DBUG("Loading snapshot headers, count = %d/%d\r\n", bitmap_root->bitmap_header->reserved2[2], bitmap_root->bitmap_header->reserved2[1]);
        if(bitmap_root->bitmap_header->reserved2[2] == -1)
        {
                LSV_DBUG("Snapshots loading completed.\r\n");

                bitmap_root->is_loading = 0;
        }
}

int private bitmap_header_load_async(struct lsv_bitmap_context *node, uint32_t first_chunk_id)
{
        lsv_volume_proto_t *volume = (lsv_volume_proto_t *)node->volume_context;
        struct lsv_bitmap_context *bitmap_root = (struct lsv_bitmap_context *)volume->bitmap_context;

        if(!bitmap_root) //first one.
                bitmap_root = node;

        node->bitmap_header->reserved2[0] = first_chunk_id;

        bitmap_root->bitmap_header->reserved2[1] ++;
        bitmap_root->bitmap_header->reserved2[2] ++;

        schedule_task_new("lsv_bimap_load_header", bitmap_header_load_callback, node, -1);

        return 0;
}
